package com.chenjl.trace.transport.extflume;

import java.io.IOException;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.chenjl.trace.model.Endpoint;
import com.chenjl.trace.model.Span;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoAnnotation;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoBinaryAnnotation;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoEndpoint;
import com.chenjl.trace.protobuf.TraceProtobuf.ProtoSpan;
/**
 * ElasticSearch Sink，接收KafkaSource通过channel传递过来的数据
 * 2018-10-25 10:56:59
 * @author chenjinlong
 */
public class ElasticSearchSink extends AbstractSink implements Configurable {
	private final Logger log = LoggerFactory.getLogger(this.getClass());
	
	private String esHosts = null;
	private String indexName = null;
	private String typeName = null;
    private RestHighLevelClient restHighLevelClient = null;
	
	
	@Override
	public void configure(Context context) {
		this.esHosts = context.getString("es_hosts");
		this.indexName = context.getString("es_index_name");
		this.typeName = context.getString("es_type_name");
		
		log.info("ElasticSearchSink读取配置-->>---esHosts: {},indexName : {}, typeName : {}",esHosts,indexName,typeName);
	}
	@Override
	public synchronized void start() {
		super.start();
		
		String[] hosts = esHosts.split(",");
		HttpHost[] httpHosts = new HttpHost[hosts.length];
		
		for(int i=0;i<hosts.length;i++) {
			String[] temps = hosts[i].split(":");
			String nodeUrl = temps[0];
			int nodePort = Integer.parseInt(temps[1]);
			
			HttpHost newHttpHost = new HttpHost(nodeUrl,nodePort,"http");
			httpHosts[i] = newHttpHost;
			log.info("启动ElasticSearchSink-->>---, 解析ElasticSearch hosts, nodeUrl: {}, nodePort : {}",nodeUrl,nodePort);
		}
		
		RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
		restHighLevelClient = new RestHighLevelClient(restClientBuilder);
	}
	@Override
	public synchronized void stop() {
		super.stop();
		
		log.info("关闭ElasticSearchSink-->>---");
		try {
			this.restHighLevelClient.close();
		}
		catch (IOException e) {
			log.error("关闭RestHighLevelClient出现未知的异常: {}",e);
		}
	}
	@Override
	public Status process() throws EventDeliveryException {
		Status status = Status.READY;
        Channel channel = super.getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        
        try {	
        	Event event = channel.take();
        	if(event != null) {
        		ProtoSpan protoSpan = ProtoSpan.parseFrom(event.getBody());
        		this.sendToStorage(protoSpan);
        	}
        	else {
        		status = Status.BACKOFF;
        	}
        	
        	//事务开启后必须 commit or rollback
        	transaction.commit();
        }
        catch(Exception e) {
        	log.error("ElasticSearchSink process-->>---,appear exception :{}",e);
            transaction.rollback();
            status = Status.BACKOFF;
        }
        finally {
            transaction.close();
        }
        
        log.info("ElasticSearchSink process-->>---, status : {}",status);
		return status;
	}
	/**
	 * 发送到ES集群
	 * @param protoSpan
	 * @throws IOException
	 */
	private void sendToStorage(ProtoSpan protoSpan) throws IOException {
		Span traceSpan = new Span();
		traceSpan.setTraceId(protoSpan.getTraceId());
		traceSpan.setParentId(protoSpan.getParentId());
		traceSpan.setId(protoSpan.getId());
		traceSpan.setName(protoSpan.getName());
		traceSpan.setSpanType(protoSpan.getSpanType());
		traceSpan.setTimestamp(protoSpan.getTimestamp());
		traceSpan.setDuration(protoSpan.getDuration());
		traceSpan.setDomainName(protoSpan.getDomainName());
		traceSpan.setIp(protoSpan.getIp());
		
		for(ProtoAnnotation protoAnnotation : protoSpan.getAnnotationsList()) {
			//事件类型
			ProtoEndpoint protoEndpoint = protoAnnotation.getEndpoint();
			
			Endpoint endpoint = new Endpoint();
			endpoint.setServiceName(protoEndpoint.getServiceName());
			endpoint.setHost(protoEndpoint.getHost());
			endpoint.setPort(protoEndpoint.getPort());
			
			Long timestamp = protoAnnotation.getTimestamp();
			String value = protoAnnotation.getValue();
			traceSpan.addAnnotation(endpoint,timestamp,value);
		}
		
		for(ProtoBinaryAnnotation protoBinaryAnnotation : protoSpan.getBinaryAnnotationsList()) {
			//RPC标注
			ProtoEndpoint protoEndpoint = protoBinaryAnnotation.getEndpoint();
			
			Endpoint endpoint = new Endpoint();
			endpoint.setServiceName(protoEndpoint.getServiceName());
			endpoint.setHost(protoEndpoint.getHost());
			endpoint.setPort(protoEndpoint.getPort());
			
			String key = protoBinaryAnnotation.getKey();
			String value = protoBinaryAnnotation.getValue();
			traceSpan.addBinaryAnnotation(endpoint,key,value);
		}
		
		IndexRequest indexRequest = new IndexRequest(indexName,typeName);
		indexRequest.source(JSONObject.toJSONString(traceSpan), XContentType.JSON);
		IndexResponse indexResponse = restHighLevelClient.index(indexRequest);
		
		String docId = indexResponse.getId();
		log.info("ElasticSearchSink发送消息到ES完成，返回文档id : {} , spanId :{} ",docId,protoSpan.getId());
	}
}